{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# From image idea to Kibana dashboard using AI\n",
    "\n",
    "This notebook is based on the article [From image idea to Kibana dashboard using AI](https://www.elastic.co/search-labs/blog/from-image-idea-to-kibana-dashboard-using-ai). With the following code, we can generate a Kibana dashboard from an image.\n",
    "\n",
    "Note: This notebook was done to be executed in Google Colab."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Install dependencies"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%pip install elasticsearch pydantic langchain langchain-openai -q"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 115,
   "metadata": {},
   "outputs": [],
   "source": [
    "import requests, time, os, base64, json, uuid, urllib.parse\n",
    "from IPython.display import Image, display\n",
    "from getpass import getpass\n",
    "from typing import Any, Dict, List, Literal\n",
    "\n",
    "from google.colab import files\n",
    "\n",
    "from elasticsearch import Elasticsearch\n",
    "from langchain.chat_models import init_chat_model\n",
    "from pydantic import BaseModel, Field"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Defining the dashboard schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 105,
   "metadata": {},
   "outputs": [],
   "source": [
    "os.environ[\"OPENAI_API_KEY\"] = getpass(\"Enter your OpenAI API key: \")\n",
    "os.environ[\"ELASTICSEARCH_API_KEY\"] = getpass(\"Enter your Elasticsearch API key: \")\n",
    "os.environ[\"ELASTICSEARCH_URL\"] = getpass(\"Enter your Elasticsearch URL: \")\n",
    "os.environ[\"KIBANA_URL\"] = getpass(\"Enter your Kibana URL: \")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Defining the dashboard schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 106,
   "metadata": {},
   "outputs": [],
   "source": [
    "class Visualization(BaseModel):\n",
    "    title: str = Field(description=\"The dashboard title\")\n",
    "    type: List[Literal[\"pie\", \"bar\", \"metric\"]]\n",
    "    field: str = Field(\n",
    "        description=\"The field that this visualization use based on the provided mappings\"\n",
    "    )\n",
    "\n",
    "\n",
    "class Dashboard(BaseModel):\n",
    "    title: str = Field(description=\"The dashboard title\")\n",
    "    visualizations: List[Visualization]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Loading the json templates\n",
    "\n",
    "There are 3 templates for each visualization type:\n",
    "- pie\n",
    "- bar\n",
    "- metric\n",
    "\n",
    "The templates are in the following format:\n",
    "- insBar.json\n",
    "- insPie.json\n",
    "- insMetric.json\n",
    "\n",
    "You can find the templates in the `/templates` folder on the repository. Download the templates and upload them using the following code (Click on the upload button and load all the files together)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "uploaded = files.upload()\n",
    "\n",
    "template_dir = \"templates\"\n",
    "os.makedirs(template_dir, exist_ok=True)\n",
    "\n",
    "for filename in uploaded.keys():\n",
    "    with open(os.path.join(template_dir, filename), \"wb\") as f:\n",
    "        f.write(uploaded[filename])\n",
    "\n",
    "templates = {}\n",
    "for vis_type in [\"pie\", \"bar\", \"metric\"]:\n",
    "    template_file = os.path.join(template_dir, f\"lns{vis_type.capitalize()}.json\")\n",
    "\n",
    "    with open(template_file, \"r\") as f:\n",
    "        templates[vis_type] = json.load(f)\n",
    "\n",
    "    if not templates:\n",
    "        print(\"No templates found\")\n",
    "        break\n",
    "\n",
    "    print(f\"Loaded {len(templates)} templates\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Defining the dashboard schema"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Retrieve index mappings for the index that the dashboard is based on."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 94,
   "metadata": {},
   "outputs": [],
   "source": [
    "INDEX_NAME = \"kibana_sample_data_logs\"\n",
    "\n",
    "es_client = Elasticsearch(\n",
    "    [os.getenv(\"ELASTICSEARCH_URL\")],\n",
    "    api_key=os.getenv(\"ELASTICSEARCH_API_KEY\"),\n",
    ")\n",
    "\n",
    "result = es_client.indices.get_mapping(index=INDEX_NAME)\n",
    "index_mappings = result[list(result.keys())[0]][\"mappings\"][\"properties\"]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Loading image \n",
    "You can load an image from your local machine and use it to generate a dashboard. In the `/imgs` folder on the repository, you can find images to use as examples."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [],
   "source": [
    "uploaded = files.upload()\n",
    "\n",
    "IMAGE_PATH = next(iter(uploaded.keys()))\n",
    "image_base64 = base64.b64encode(open(IMAGE_PATH, \"rb\").read()).decode(\"utf-8\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "prompt = f\"\"\"\n",
    "    You are an expert in analyzing Kibana dashboards from images for the version 9.0.0 of Kibana.\n",
    "\n",
    "    You will be given a dashboard image and a Elasticsearch index mappings.\n",
    "\n",
    "    Below is the index mappings for the index that the dashboard is based on. Use this to help you understand the data and the fields that are available.\n",
    "\n",
    "    Index Mappings:\n",
    "    {index_mappings}\n",
    "\n",
    "    Only include the fields that are relevant for each visualization, based on what is visible in the image.\n",
    "    \"\"\"\n",
    "\n",
    "message = [\n",
    "    {\n",
    "        \"role\": \"user\",\n",
    "        \"content\": [\n",
    "            {\"type\": \"text\", \"text\": prompt},\n",
    "            {\n",
    "                \"type\": \"image\",\n",
    "                \"source_type\": \"base64\",\n",
    "                \"data\": image_base64,\n",
    "                \"mime_type\": \"image/png\",\n",
    "            },\n",
    "        ],\n",
    "    }\n",
    "]\n",
    "\n",
    "\n",
    "try:\n",
    "    llm = init_chat_model(\"gpt-4.1-mini\")\n",
    "    llm = llm.with_structured_output(Dashboard)\n",
    "    dashboard_values = llm.invoke(message)\n",
    "\n",
    "    print(\"Dashboard values generated by the LLM successfully\")\n",
    "    print(dashboard_values)\n",
    "except Exception as e:\n",
    "    print(f\"Failed to analyze image and match fields: {str(e)}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Filling the template with the values generated by the LLM:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 116,
   "metadata": {},
   "outputs": [],
   "source": [
    "def fill_template_with_analysis(\n",
    "    template: Dict[str, Any],\n",
    "    visualization: Visualization,\n",
    "    grid_data: Dict[str, Any],\n",
    "):\n",
    "    template_str = json.dumps(template)\n",
    "    replacements = {\n",
    "        \"{visualization_id}\": str(uuid.uuid4()),\n",
    "        \"{title}\": visualization.title,\n",
    "        \"{x}\": grid_data[\"x\"],\n",
    "        \"{y}\": grid_data[\"y\"],\n",
    "    }\n",
    "\n",
    "    if visualization.field:\n",
    "        replacements[\"{field}\"] = visualization.field\n",
    "\n",
    "    for placeholder, value in replacements.items():\n",
    "        template_str = template_str.replace(placeholder, str(value))\n",
    "\n",
    "    return json.loads(template_str)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "panels = []\n",
    "grid_data = [\n",
    "    {\"x\": 0, \"y\": 0},\n",
    "    {\"x\": 12, \"y\": 0},\n",
    "    {\"x\": 0, \"y\": 12},\n",
    "    {\"x\": 12, \"y\": 12},\n",
    "]\n",
    "\n",
    "i = 0\n",
    "\n",
    "for vis in dashboard_values.visualizations:\n",
    "    for vis_type in vis.type:\n",
    "        template = templates.get(vis_type, templates.get(\"bar\", {}))\n",
    "        filled_panel = fill_template_with_analysis(template, vis, grid_data[i])\n",
    "        panels.append(filled_panel)\n",
    "        i += 1"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Generate the dashboard\n",
    "\n",
    "Here is called the API `/api/generate-dashboard`. The templates with the values generated by the LLM are sent to the API."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    dashboard_id = str(uuid.uuid4())\n",
    "\n",
    "    # post request to create the dashboard endpoint\n",
    "    url = f\"{os.getenv('KIBANA_URL')}/api/dashboards/dashboard/{dashboard_id}\"\n",
    "\n",
    "    dashboard_config = {\n",
    "        \"attributes\": {\n",
    "            \"title\": dashboard_values.title,\n",
    "            \"description\": \"Generated by AI\",\n",
    "            \"timeRestore\": True,\n",
    "            \"panels\": panels,  # Visualizations with the values generated by the LLM\n",
    "            \"timeFrom\": \"now-7d/d\",\n",
    "            \"timeTo\": \"now\",\n",
    "        },\n",
    "    }\n",
    "\n",
    "    headers = {\n",
    "        \"Content-Type\": \"application/json\",\n",
    "        \"kbn-xsrf\": \"true\",\n",
    "        \"Authorization\": f\"ApiKey {os.getenv('ELASTICSEARCH_API_KEY')}\",\n",
    "    }\n",
    "\n",
    "    requests.post(\n",
    "        url,\n",
    "        headers=headers,\n",
    "        json=dashboard_config,\n",
    "    )\n",
    "\n",
    "    # Url to the generated dashboard\n",
    "    dashboard_url = f\"{os.getenv('KIBANA_URL')}/app/dashboards#/view/{dashboard_id}\"\n",
    "\n",
    "    print(\"Dashboard URL: \", dashboard_url)\n",
    "    print(\"Dashboard ID: \", dashboard_id)\n",
    "\n",
    "except Exception as e:\n",
    "    print(f\"Failed to create dashboard: {str(e)}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Generating a dashboard image from Kibana"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "screenshot_height = 680\n",
    "screenshot_width = 1418\n",
    "\n",
    "job_params = (\n",
    "    f\"(browserTimezone:America/Panama,\"\n",
    "    f\"layout:(dimensions:(height:{screenshot_height},width:{screenshot_width}),id:preserve_layout),\"\n",
    "    f\"locatorParams:(id:DASHBOARD_APP_LOCATOR,params:(dashboardId:'{dashboard_id}',\"\n",
    "    f\"preserveSavedFilters:!t,\"\n",
    "    f\"timeRange:(from:now-7d/d,to:now),\"\n",
    "    f\"useHash:!f,viewMode:view)),\"\n",
    "    f\"objectType:dashboard,\"\n",
    "    f\"title:'{dashboard_values.title}',\"\n",
    "    f\"version:'9.0.0')\"\n",
    ")\n",
    "\n",
    "# Creating job report request\n",
    "job_params_str = urllib.parse.quote(job_params)\n",
    "\n",
    "url = (\n",
    "    f\"{os.getenv('KIBANA_URL')}/api/reporting/generate/pngV2?jobParams={job_params_str}\"\n",
    ")\n",
    "headers = {\n",
    "    \"kbn-xsrf\": \"true\",\n",
    "    \"Authorization\": f\"ApiKey {os.getenv('ELASTICSEARCH_API_KEY')}\",\n",
    "}\n",
    "\n",
    "r = requests.post(url, headers=headers)\n",
    "\n",
    "if r.status_code != 200:\n",
    "    raise Exception(\"Failed to start report job\")\n",
    "\n",
    "job_url = f\"{os.getenv('KIBANA_URL')}{r.json()['path']}\"\n",
    "\n",
    "\n",
    "while True:\n",
    "    resp = requests.get(job_url, headers=headers)\n",
    "\n",
    "    print(resp.status_code)\n",
    "    if resp.status_code == 200:\n",
    "        with open(\"dashboard.png\", \"wb\") as f:\n",
    "            f.write(resp.content)\n",
    "\n",
    "        print(\"PNG downloaded as dashboard.png\")\n",
    "        display(Image(\"dashboard.png\"))\n",
    "        break\n",
    "\n",
    "    retry_after = resp.headers.get(\"Retry-After\")\n",
    "\n",
    "    if retry_after:\n",
    "        print(f\"Waiting {retry_after} seconds before retrying...\")\n",
    "        time.sleep(int(retry_after))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "venv",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.13.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
